mirror of https://github.com/apache/kafka.git
KAFKA-18050 Upgrade the checkstyle version to 10.20.2 (#17999)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
4362ab7090
commit
2b43c49f51
|
@ -39,7 +39,9 @@
|
|||
<module name="EqualsHashCode"/>
|
||||
<module name="SimplifyBooleanExpression"/>
|
||||
<module name="OneStatementPerLine"/>
|
||||
<module name="UnnecessaryParentheses" />
|
||||
<module name="UnnecessaryParentheses">
|
||||
<property name="tokens" value="IDENT, NUM_DOUBLE, LAMBDA, TEXT_BLOCK_LITERAL_BEGIN, UNARY_MINUS, UNARY_PLUS, INC, DEC, POST_INC, POST_DEC" />
|
||||
</module>
|
||||
<module name="SimplifyBooleanReturn"/>
|
||||
|
||||
<!-- style -->
|
||||
|
|
|
@ -4822,12 +4822,12 @@ public class KafkaAdminClient extends AdminClient {
|
|||
setHost(endpoint.host()).
|
||||
setPort(endpoint.port())));
|
||||
return new AddRaftVoterRequest.Builder(
|
||||
new AddRaftVoterRequestData().
|
||||
setClusterId(options.clusterId().orElse(null)).
|
||||
setTimeoutMs(timeoutMs).
|
||||
setVoterId(voterId) .
|
||||
setVoterDirectoryId(voterDirectoryId).
|
||||
setListeners(listeners));
|
||||
new AddRaftVoterRequestData().
|
||||
setClusterId(options.clusterId().orElse(null)).
|
||||
setTimeoutMs(timeoutMs).
|
||||
setVoterId(voterId) .
|
||||
setVoterDirectoryId(voterDirectoryId).
|
||||
setListeners(listeners));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,7 +44,7 @@ public final class OffsetsForLeaderEpochUtils {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(OffsetsForLeaderEpochUtils.class);
|
||||
|
||||
private OffsetsForLeaderEpochUtils(){}
|
||||
private OffsetsForLeaderEpochUtils() {}
|
||||
|
||||
static AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest(
|
||||
Map<TopicPartition, SubscriptionState.FetchPosition> requestData) {
|
||||
|
|
|
@ -202,14 +202,14 @@ public interface Authorizer extends Configurable, Closeable {
|
|||
|
||||
EnumMap<PatternType, Set<String>> denyPatterns =
|
||||
new EnumMap<>(PatternType.class) {{
|
||||
put(PatternType.LITERAL, new HashSet<>());
|
||||
put(PatternType.PREFIXED, new HashSet<>());
|
||||
}};
|
||||
put(PatternType.LITERAL, new HashSet<>());
|
||||
put(PatternType.PREFIXED, new HashSet<>());
|
||||
}};
|
||||
EnumMap<PatternType, Set<String>> allowPatterns =
|
||||
new EnumMap<>(PatternType.class) {{
|
||||
put(PatternType.LITERAL, new HashSet<>());
|
||||
put(PatternType.PREFIXED, new HashSet<>());
|
||||
}};
|
||||
put(PatternType.LITERAL, new HashSet<>());
|
||||
put(PatternType.PREFIXED, new HashSet<>());
|
||||
}};
|
||||
|
||||
boolean hasWildCardAllow = false;
|
||||
|
||||
|
|
|
@ -2061,9 +2061,9 @@ public class KafkaAdminClientTest {
|
|||
new DescribeConfigsResponseData().setResults(asList(new DescribeConfigsResponseData.DescribeConfigsResult()
|
||||
.setResourceName(brokerResource.name()).setResourceType(brokerResource.type().id()).setErrorCode(Errors.NONE.code())
|
||||
.setConfigs(emptyList()),
|
||||
new DescribeConfigsResponseData.DescribeConfigsResult()
|
||||
.setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code())
|
||||
.setConfigs(emptyList())))), env.cluster().nodeById(0));
|
||||
new DescribeConfigsResponseData.DescribeConfigsResult()
|
||||
.setResourceName(brokerLoggerResource.name()).setResourceType(brokerLoggerResource.type().id()).setErrorCode(Errors.NONE.code())
|
||||
.setConfigs(emptyList())))), env.cluster().nodeById(0));
|
||||
Map<ConfigResource, KafkaFuture<Config>> result = env.adminClient().describeConfigs(asList(
|
||||
brokerResource,
|
||||
brokerLoggerResource)).values();
|
||||
|
@ -2102,9 +2102,9 @@ public class KafkaAdminClientTest {
|
|||
new DescribeConfigsResponseData().setResults(asList(new DescribeConfigsResponseData.DescribeConfigsResult()
|
||||
.setResourceName(topic.name()).setResourceType(topic.type().id()).setErrorCode(Errors.NONE.code())
|
||||
.setConfigs(emptyList()),
|
||||
new DescribeConfigsResponseData.DescribeConfigsResult()
|
||||
.setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code())
|
||||
.setConfigs(emptyList())))));
|
||||
new DescribeConfigsResponseData.DescribeConfigsResult()
|
||||
.setResourceName(unrequested.name()).setResourceType(unrequested.type().id()).setErrorCode(Errors.NONE.code())
|
||||
.setConfigs(emptyList())))));
|
||||
Map<ConfigResource, KafkaFuture<Config>> result = env.adminClient().describeConfigs(singletonList(
|
||||
topic)).values();
|
||||
assertEquals(new HashSet<>(singletonList(topic)), result.keySet());
|
||||
|
|
|
@ -177,11 +177,11 @@ public class ListConsumerGroupOffsetsHandlerTest {
|
|||
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapTwo =
|
||||
Collections.singletonMap(t2p2, new OffsetAndMetadata(10L));
|
||||
Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult =
|
||||
new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>() {{
|
||||
put(groupZero, offsetAndMetadataMapZero);
|
||||
put(groupOne, offsetAndMetadataMapOne);
|
||||
put(groupTwo, offsetAndMetadataMapTwo);
|
||||
}};
|
||||
new HashMap<>() {{
|
||||
put(groupZero, offsetAndMetadataMapZero);
|
||||
put(groupOne, offsetAndMetadataMapOne);
|
||||
put(groupTwo, offsetAndMetadataMapTwo);
|
||||
}};
|
||||
|
||||
assertCompletedForMultipleGroups(
|
||||
handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult);
|
||||
|
@ -304,11 +304,11 @@ public class ListConsumerGroupOffsetsHandlerTest {
|
|||
responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE));
|
||||
|
||||
Map<String, Map<TopicPartition, PartitionData>> responseData =
|
||||
new HashMap<String, Map<TopicPartition, PartitionData>>() {{
|
||||
put(groupZero, responseDataZero);
|
||||
put(groupOne, responseDataOne);
|
||||
put(groupTwo, responseDataTwo);
|
||||
}};
|
||||
new HashMap<>() {{
|
||||
put(groupZero, responseDataZero);
|
||||
put(groupOne, responseDataOne);
|
||||
put(groupTwo, responseDataTwo);
|
||||
}};
|
||||
|
||||
Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
|
||||
return new OffsetFetchResponse(0, errorMap, responseData);
|
||||
|
|
|
@ -298,8 +298,8 @@ public class ConsumerProtocolTest {
|
|||
if (version >= 1) {
|
||||
assertEquals(
|
||||
Set.of(
|
||||
new ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)),
|
||||
new ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0)
|
||||
new ConsumerProtocolSubscription.TopicPartition().setTopic("foo").setPartitions(Collections.singletonList(0)),
|
||||
new ConsumerProtocolSubscription.TopicPartition().setTopic("bar").setPartitions(Collections.singletonList(0)
|
||||
)),
|
||||
Set.copyOf(parsedSubscription.ownedPartitions())
|
||||
);
|
||||
|
|
|
@ -1815,10 +1815,10 @@ public class TransactionManagerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@EnumSource(names = {
|
||||
"UNKNOWN_TOPIC_OR_PARTITION",
|
||||
"REQUEST_TIMED_OUT",
|
||||
"COORDINATOR_LOAD_IN_PROGRESS",
|
||||
"CONCURRENT_TRANSACTIONS"
|
||||
"UNKNOWN_TOPIC_OR_PARTITION",
|
||||
"REQUEST_TIMED_OUT",
|
||||
"COORDINATOR_LOAD_IN_PROGRESS",
|
||||
"CONCURRENT_TRANSACTIONS"
|
||||
})
|
||||
public void testRetriableErrors(Errors error) {
|
||||
// Ensure FindCoordinator retries.
|
||||
|
|
|
@ -907,8 +907,8 @@ public class SelectorTest {
|
|||
}
|
||||
assertNotNull(selector.lowestPriorityChannel());
|
||||
for (int i = conns - 1; i >= 0; i--) {
|
||||
if (i != 2)
|
||||
assertEquals("", blockingRequest(String.valueOf(i), ""));
|
||||
if (i != 2)
|
||||
assertEquals("", blockingRequest(String.valueOf(i), ""));
|
||||
time.sleep(10);
|
||||
}
|
||||
assertEquals("2", selector.lowestPriorityChannel().id());
|
||||
|
|
|
@ -390,16 +390,16 @@ public class OffsetFetchResponseTest {
|
|||
.setErrorCode(Errors.NOT_COORDINATOR.code())
|
||||
.setThrottleTimeMs(throttleTimeMs)
|
||||
.setTopics(Collections.singletonList(
|
||||
new OffsetFetchResponseTopic()
|
||||
.setName(topicOne)
|
||||
.setPartitions(Collections.singletonList(
|
||||
new OffsetFetchResponsePartition()
|
||||
.setPartitionIndex(partitionOne)
|
||||
.setCommittedOffset(offset)
|
||||
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
|
||||
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
|
||||
.setMetadata(metadata))
|
||||
))
|
||||
new OffsetFetchResponseTopic()
|
||||
.setName(topicOne)
|
||||
.setPartitions(Collections.singletonList(
|
||||
new OffsetFetchResponsePartition()
|
||||
.setPartitionIndex(partitionOne)
|
||||
.setCommittedOffset(offset)
|
||||
.setCommittedLeaderEpoch(RecordBatch.NO_PARTITION_LEADER_EPOCH)
|
||||
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
|
||||
.setMetadata(metadata))
|
||||
))
|
||||
);
|
||||
assertEquals(expectedData, response.data());
|
||||
}
|
||||
|
|
|
@ -2187,9 +2187,9 @@ public class RequestResponseTest {
|
|||
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
|
||||
Collections.singleton(
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocol()
|
||||
.setName("consumer-range")
|
||||
.setMetadata(new byte[0])).iterator()
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocol()
|
||||
.setName("consumer-range")
|
||||
.setMetadata(new byte[0])).iterator()
|
||||
);
|
||||
|
||||
JoinGroupRequestData data = new JoinGroupRequestData()
|
||||
|
|
|
@ -167,10 +167,10 @@ public class MirrorSourceConnectorTest {
|
|||
new DefaultReplicationPolicy(), x -> true, getConfigPropertyFilter());
|
||||
assertFalse(connector.shouldReplicateAcl(
|
||||
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
|
||||
new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
|
||||
new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))), "should not replicate ALLOW WRITE");
|
||||
assertTrue(connector.shouldReplicateAcl(
|
||||
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
|
||||
new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
|
||||
new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))), "should replicate ALLOW ALL");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1196,7 +1196,7 @@ public class AbstractHerderTest {
|
|||
keys.putAll(configDef.configKeys());
|
||||
}
|
||||
|
||||
protected void addValue(List<ConfigValue> values, String name, String value, String...errors) {
|
||||
protected void addValue(List<ConfigValue> values, String name, String value, String... errors) {
|
||||
values.add(new ConfigValue(name, value, new ArrayList<>(), Arrays.asList(errors)));
|
||||
}
|
||||
|
||||
|
@ -1211,7 +1211,7 @@ public class AbstractHerderTest {
|
|||
assertNull(info.configKey());
|
||||
}
|
||||
|
||||
protected void assertInfoValue(ConfigInfos infos, String name, String value, String...errors) {
|
||||
protected void assertInfoValue(ConfigInfos infos, String name, String value, String... errors) {
|
||||
ConfigValueInfo info = findInfo(infos, name).configValue();
|
||||
assertEquals(name, info.name());
|
||||
assertEquals(value, info.value());
|
||||
|
|
|
@ -35,9 +35,8 @@ public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R>, V
|
|||
private static final String NAME_CONFIG = "name";
|
||||
public static final String OVERVIEW_DOC = "A predicate which is true for records with at least one header with the configured name.";
|
||||
public static final ConfigDef CONFIG_DEF = new ConfigDef()
|
||||
.define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
|
||||
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
|
||||
"The header name.");
|
||||
.define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
|
||||
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "The header name.");
|
||||
private String name;
|
||||
|
||||
@Override
|
||||
|
|
|
@ -50,11 +50,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ClusterTestDefaults(serverProperties = {
|
||||
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
|
||||
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
|
||||
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
|
||||
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000")
|
||||
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
|
||||
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
|
||||
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
|
||||
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "2000")
|
||||
})
|
||||
@ExtendWith(ClusterTestExtensions.class)
|
||||
public class AdminFenceProducersTest {
|
||||
|
|
|
@ -75,7 +75,7 @@ public class ClientTelemetryTest {
|
|||
types = Type.KRAFT,
|
||||
brokers = 3,
|
||||
serverProperties = {
|
||||
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"),
|
||||
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "kafka.admin.ClientTelemetryTest$GetIdClientTelemetry"),
|
||||
})
|
||||
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
|
|
|
@ -420,7 +420,7 @@ public class ConfigCommandTest {
|
|||
assertEquals("[[1, 2], [3, 4]]", addedProps.getProperty("nested"));
|
||||
}
|
||||
|
||||
public void testExpectedEntityTypeNames(List<String> expectedTypes, List<String> expectedNames, List<String> connectOpts, String...args) {
|
||||
public void testExpectedEntityTypeNames(List<String> expectedTypes, List<String> expectedNames, List<String> connectOpts, String... args) {
|
||||
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList(connectOpts.get(0), connectOpts.get(1), "--describe"), Arrays.asList(args)));
|
||||
createOpts.checkArgs();
|
||||
assertEquals(createOpts.entityTypes().toSeq(), seq(expectedTypes));
|
||||
|
@ -1434,7 +1434,7 @@ public class ConfigCommandTest {
|
|||
}
|
||||
|
||||
@SafeVarargs
|
||||
public static <K, V> Map<K, V> concat(Map<K, V>...maps) {
|
||||
public static <K, V> Map<K, V> concat(Map<K, V>... maps) {
|
||||
Map<K, V> res = new HashMap<>();
|
||||
Stream.of(maps)
|
||||
.map(Map::entrySet)
|
||||
|
|
|
@ -62,10 +62,10 @@ public class UserScramCredentialsCommandTest {
|
|||
}
|
||||
}
|
||||
|
||||
private ConfigCommandResult runConfigCommandViaBroker(String...args) {
|
||||
private ConfigCommandResult runConfigCommandViaBroker(String... args) {
|
||||
AtomicReference<OptionalInt> exitStatus = new AtomicReference<>(OptionalInt.empty());
|
||||
Exit.setExitProcedure((status, __) -> {
|
||||
exitStatus.set(OptionalInt.of((Integer) status));
|
||||
exitStatus.set(OptionalInt.of(status));
|
||||
throw new RuntimeException();
|
||||
});
|
||||
|
||||
|
|
|
@ -311,16 +311,16 @@ public class BootstrapControllersIntegrationTest {
|
|||
}
|
||||
|
||||
@ClusterTest(serverProperties = {
|
||||
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
|
||||
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer")
|
||||
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
|
||||
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer")
|
||||
})
|
||||
public void testAclsByControllers(ClusterInstance clusterInstance) throws Exception {
|
||||
testAcls(clusterInstance, true);
|
||||
}
|
||||
|
||||
@ClusterTest(serverProperties = {
|
||||
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
|
||||
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer")
|
||||
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
|
||||
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer")
|
||||
})
|
||||
public void testAcls(ClusterInstance clusterInstance) throws Exception {
|
||||
testAcls(clusterInstance, false);
|
||||
|
|
|
@ -58,10 +58,7 @@ versions += [
|
|||
// but currently, tests are failing in >=3.1.2. Therefore, we are temporarily using version 3.1.1.
|
||||
// The failing tests should be fixed under KAFKA-18089, allowing us to upgrade to >=3.1.2.
|
||||
caffeine: "3.1.1",
|
||||
// when updating checkstyle, check whether the exclusion of
|
||||
// CVE-2023-2976 and CVE-2020-8908 can be dropped from
|
||||
// gradle/resources/dependencycheck-suppressions.xml
|
||||
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "8.36.2",
|
||||
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
|
||||
commonsCli: "1.4",
|
||||
commonsIo: "2.14.0", // ZooKeeper dependency. Do not use, this is going away.
|
||||
commonsValidator: "1.9.0",
|
||||
|
|
|
@ -23,17 +23,6 @@
|
|||
]]></notes>
|
||||
<cve>CVE-2023-35116</cve>
|
||||
</suppress>
|
||||
<suppress>
|
||||
<notes><![CDATA[
|
||||
This older version of Guava is only included in checkstyle.
|
||||
CVE-2023-2976 and CVE-2020-8908 are irrelevant for checkstyle,
|
||||
as it is not executed with elevated privileges.
|
||||
This suppression will no longer be needed when checkstyle
|
||||
is updated to 10.5.0 or later.
|
||||
]]></notes>
|
||||
<cve>CVE-2020-8908</cve>
|
||||
<cve>CVE-2023-2976</cve>
|
||||
</suppress>
|
||||
<suppress>
|
||||
<notes><![CDATA[
|
||||
Kafka does not use CgiServlet
|
||||
|
|
|
@ -316,10 +316,10 @@ public class ConfigurationControlManagerTest {
|
|||
assertEquals(ControllerResult.atomicOf(asList(new ApiMessageAndVersion(
|
||||
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
|
||||
setName("foo.bar").setValue("123"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
|
||||
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
|
||||
setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
|
||||
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
|
||||
setName("broker.config.to.remove").setValue(null), CONFIG_RECORD.highestSupportedVersion())
|
||||
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
|
||||
setName("quux").setValue("456"), CONFIG_RECORD.highestSupportedVersion()), new ApiMessageAndVersion(
|
||||
new ConfigRecord().setResourceType(BROKER.id()).setResourceName("0").
|
||||
setName("broker.config.to.remove").setValue(null), CONFIG_RECORD.highestSupportedVersion())
|
||||
),
|
||||
toMap(entry(MYTOPIC, new ApiError(Errors.POLICY_VIOLATION,
|
||||
"Expected: AlterConfigPolicy.RequestMetadata(resource=ConfigResource(" +
|
||||
|
|
|
@ -867,10 +867,10 @@ public class QuorumControllerTest {
|
|||
Arrays.asList(new CreatableReplicaAssignment().
|
||||
setPartitionIndex(0).
|
||||
setBrokerIds(Arrays.asList(0, 1, 2)),
|
||||
new CreatableReplicaAssignment().
|
||||
setPartitionIndex(1).
|
||||
setBrokerIds(Arrays.asList(1, 2, 0))).
|
||||
iterator()))).iterator())),
|
||||
new CreatableReplicaAssignment().
|
||||
setPartitionIndex(1).
|
||||
setBrokerIds(Arrays.asList(1, 2, 0))).
|
||||
iterator()))).iterator())),
|
||||
Collections.singleton("foo")).get();
|
||||
fooId = fooData.topics().find("foo").topicId();
|
||||
active.allocateProducerIds(ANONYMOUS_CONTEXT,
|
||||
|
|
|
@ -1800,7 +1800,7 @@ public class ReplicationControlManagerTest {
|
|||
setReplicas(asList(0, 2, 1)),
|
||||
new ReassignablePartition().setPartitionIndex(2).
|
||||
setReplicas(asList(0, 2, 1)))),
|
||||
new ReassignableTopic().setName("bar"))));
|
||||
new ReassignableTopic().setName("bar"))));
|
||||
assertEquals(new AlterPartitionReassignmentsResponseData().
|
||||
setErrorMessage(null).setResponses(asList(
|
||||
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
|
||||
|
@ -2151,27 +2151,21 @@ public class ReplicationControlManagerTest {
|
|||
setReplicas(asList(5, 6, 7)),
|
||||
new ReassignablePartition().setPartitionIndex(3).
|
||||
setReplicas(Collections.emptyList()))),
|
||||
new ReassignableTopic().setName("bar").setPartitions(singletonList(
|
||||
new ReassignableTopic().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartition().setPartitionIndex(0).
|
||||
setReplicas(asList(1, 2, 3, 4, 0)))))));
|
||||
assertEquals(new AlterPartitionReassignmentsResponseData().
|
||||
setErrorMessage(null).setResponses(asList(
|
||||
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorMessage(null),
|
||||
new ReassignablePartitionResponse().setPartitionIndex(1).
|
||||
setErrorMessage(null),
|
||||
new ReassignablePartitionResponse().setPartitionIndex(2).
|
||||
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
|
||||
setErrorMessage("The manual partition assignment includes broker 5, " +
|
||||
"but no such broker is registered."),
|
||||
new ReassignablePartitionResponse().setPartitionIndex(3).
|
||||
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
|
||||
setErrorMessage("The manual partition assignment includes an empty " +
|
||||
"replica list."))),
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorMessage(null))))),
|
||||
setErrorMessage(null).
|
||||
setResponses(asList(
|
||||
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null),
|
||||
new ReassignablePartitionResponse().setPartitionIndex(1).setErrorMessage(null),
|
||||
new ReassignablePartitionResponse().setPartitionIndex(2).setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
|
||||
setErrorMessage("The manual partition assignment includes broker 5, but no such broker is registered."),
|
||||
new ReassignablePartitionResponse().setPartitionIndex(3).setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
|
||||
setErrorMessage("The manual partition assignment includes an empty replica list."))),
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
|
||||
alterResult.response());
|
||||
ctx.replay(alterResult.records());
|
||||
assertEquals(new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 4}).setIsr(new int[] {1, 2, 4}).
|
||||
|
|
|
@ -493,11 +493,11 @@ public class AssignmentsManagerTest {
|
|||
setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(2))),
|
||||
new AssignReplicasToDirsRequestData.TopicData().
|
||||
setTopicId(TOPIC_2).
|
||||
setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(5))))),
|
||||
new AssignReplicasToDirsRequestData.TopicData().
|
||||
setTopicId(TOPIC_2).
|
||||
setPartitions(Collections.singletonList(
|
||||
new AssignReplicasToDirsRequestData.PartitionData().
|
||||
setPartitionIndex(5))))),
|
||||
new AssignReplicasToDirsRequestData.DirectoryData().
|
||||
setId(DIR_3).
|
||||
setTopics(Collections.singletonList(
|
||||
|
|
|
@ -177,8 +177,8 @@ public class LogSegment implements Closeable {
|
|||
public void sanityCheck(boolean timeIndexFileNewlyCreated) throws IOException {
|
||||
if (offsetIndexFile().exists()) {
|
||||
// Resize the time index file to 0 if it is newly created.
|
||||
if (timeIndexFileNewlyCreated)
|
||||
timeIndex().resize(0);
|
||||
if (timeIndexFileNewlyCreated)
|
||||
timeIndex().resize(0);
|
||||
// Sanity checks for time index and offset index are skipped because
|
||||
// we will recover the segments above the recovery point in recoverLog()
|
||||
// in any case so sanity checking them here is redundant.
|
||||
|
|
|
@ -132,12 +132,12 @@ public class LogValidatorTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@CsvSource({
|
||||
"0,gzip,none", "1,gzip,none", "2,gzip,none",
|
||||
"0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
|
||||
"0,snappy,gzip", "1,snappy,gzip", "2,snappy,gzip",
|
||||
"0,lz4,gzip", "1,lz4,gzip", "2,lz4,gzip",
|
||||
"2,none,none", "2,none,gzip",
|
||||
"2,zstd,gzip",
|
||||
"0,gzip,none", "1,gzip,none", "2,gzip,none",
|
||||
"0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
|
||||
"0,snappy,gzip", "1,snappy,gzip", "2,snappy,gzip",
|
||||
"0,lz4,gzip", "1,lz4,gzip", "2,lz4,gzip",
|
||||
"2,none,none", "2,none,gzip",
|
||||
"2,zstd,gzip",
|
||||
})
|
||||
public void checkOnlyOneBatch(Byte magic, String sourceCompression,
|
||||
String targetCompression) {
|
||||
|
@ -712,9 +712,9 @@ public class LogValidatorTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@CsvSource({
|
||||
"0,gzip,gzip", "1,gzip,gzip",
|
||||
"0,lz4,lz4", "1,lz4,lz4",
|
||||
"0,snappy,snappy", "1,snappy,snappy",
|
||||
"0,gzip,gzip", "1,gzip,gzip",
|
||||
"0,lz4,lz4", "1,lz4,lz4",
|
||||
"0,snappy,snappy", "1,snappy,snappy",
|
||||
})
|
||||
public void checkInvalidChecksum(byte magic, String compressionName, String typeName) {
|
||||
Compression compression = Compression.of(compressionName).build();
|
||||
|
@ -813,10 +813,10 @@ public class LogValidatorTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@CsvSource({
|
||||
"0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
|
||||
"0,lz4,lz4", "1,lz4,lz4", "2,lz4,lz4",
|
||||
"0,snappy,snappy", "1,snappy,snappy", "2,snappy,snappy",
|
||||
"2,zstd,zstd"
|
||||
"0,gzip,gzip", "1,gzip,gzip", "2,gzip,gzip",
|
||||
"0,lz4,lz4", "1,lz4,lz4", "2,lz4,lz4",
|
||||
"0,snappy,snappy", "1,snappy,snappy", "2,snappy,snappy",
|
||||
"2,zstd,zstd"
|
||||
})
|
||||
public void checkNoKeyCompactedTopic(byte magic, String compressionName, String typeName) {
|
||||
Compression codec = Compression.of(compressionName).build();
|
||||
|
|
|
@ -134,12 +134,12 @@ public class PageViewTypedDemo {
|
|||
@SuppressWarnings("DefaultAnnotationParam") // being explicit for the example
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "_t")
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = PageView.class, name = "pv"),
|
||||
@JsonSubTypes.Type(value = UserProfile.class, name = "up"),
|
||||
@JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
|
||||
@JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"),
|
||||
@JsonSubTypes.Type(value = RegionCount.class, name = "rc")
|
||||
})
|
||||
@JsonSubTypes.Type(value = PageView.class, name = "pv"),
|
||||
@JsonSubTypes.Type(value = UserProfile.class, name = "up"),
|
||||
@JsonSubTypes.Type(value = PageViewByRegion.class, name = "pvbr"),
|
||||
@JsonSubTypes.Type(value = WindowedPageViewByRegion.class, name = "wpvbr"),
|
||||
@JsonSubTypes.Type(value = RegionCount.class, name = "rc")
|
||||
})
|
||||
public interface JSONSerdeCompatible {
|
||||
|
||||
}
|
||||
|
|
|
@ -97,9 +97,10 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY})
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
|
||||
})
|
||||
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
|
||||
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
|
||||
// value is one minute
|
||||
|
@ -108,9 +109,10 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY})
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
|
||||
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
|
||||
})
|
||||
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
|
||||
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
|
||||
// value is one minute
|
||||
|
|
|
@ -982,10 +982,11 @@ public class QueryableStateIntegrationTest {
|
|||
streamOne,
|
||||
batch1,
|
||||
TestUtils.producerConfig(
|
||||
CLUSTER.bootstrapServers(),
|
||||
StringSerializer.class,
|
||||
StringSerializer.class,
|
||||
new Properties()),
|
||||
CLUSTER.bootstrapServers(),
|
||||
StringSerializer.class,
|
||||
StringSerializer.class,
|
||||
new Properties()
|
||||
),
|
||||
mockTime);
|
||||
|
||||
final KStream<String, String> s1 = builder.stream(streamOne);
|
||||
|
|
|
@ -36,13 +36,13 @@ import org.junit.platform.suite.api.Suite;
|
|||
*/
|
||||
@Suite
|
||||
@SelectClasses({
|
||||
CompositeReadOnlyKeyValueStoreTest.class,
|
||||
CompositeReadOnlyWindowStoreTest.class,
|
||||
CompositeReadOnlySessionStoreTest.class,
|
||||
GlobalStateStoreProviderTest.class,
|
||||
StreamThreadStateStoreProviderTest.class,
|
||||
WrappingStoreProviderTest.class,
|
||||
QueryableStateIntegrationTest.class,
|
||||
})
|
||||
CompositeReadOnlyKeyValueStoreTest.class,
|
||||
CompositeReadOnlyWindowStoreTest.class,
|
||||
CompositeReadOnlySessionStoreTest.class,
|
||||
GlobalStateStoreProviderTest.class,
|
||||
StreamThreadStateStoreProviderTest.class,
|
||||
WrappingStoreProviderTest.class,
|
||||
QueryableStateIntegrationTest.class,
|
||||
})
|
||||
public class StoreQuerySuite {
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.slf4j.Logger;
|
|||
|
||||
public final class StreamStreamJoinUtil {
|
||||
|
||||
private StreamStreamJoinUtil(){
|
||||
private StreamStreamJoinUtil() {
|
||||
}
|
||||
|
||||
public static <KIn, VIn, KOut, VOut> boolean skipRecord(
|
||||
|
|
|
@ -31,13 +31,13 @@ import org.junit.platform.suite.api.Suite;
|
|||
*/
|
||||
@Suite
|
||||
@SelectClasses({
|
||||
StreamTaskTest.class,
|
||||
StandbyTaskTest.class,
|
||||
GlobalStateTaskTest.class,
|
||||
TaskManagerTest.class,
|
||||
TaskMetricsTest.class,
|
||||
LegacyStickyTaskAssignorTest.class,
|
||||
StreamsPartitionAssignorTest.class,
|
||||
StreamTaskTest.class,
|
||||
StandbyTaskTest.class,
|
||||
GlobalStateTaskTest.class,
|
||||
TaskManagerTest.class,
|
||||
TaskMetricsTest.class,
|
||||
LegacyStickyTaskAssignorTest.class,
|
||||
StreamsPartitionAssignorTest.class,
|
||||
})
|
||||
public class TaskSuite {
|
||||
}
|
||||
|
|
|
@ -235,7 +235,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
|
|||
}
|
||||
|
||||
public static class RocksDBConfigSetterWithUserProvidedStatistics implements RocksDBConfigSetter {
|
||||
public RocksDBConfigSetterWithUserProvidedStatistics(){}
|
||||
public RocksDBConfigSetterWithUserProvidedStatistics() {}
|
||||
|
||||
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
|
||||
lastStatistics = new Statistics();
|
||||
|
@ -306,7 +306,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
|
|||
|
||||
|
||||
public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter {
|
||||
public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig(){}
|
||||
public RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig() {}
|
||||
|
||||
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
|
||||
options.setTableFormatConfig(new BlockBasedTableConfig());
|
||||
|
@ -335,7 +335,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
|
|||
}
|
||||
|
||||
public static class RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig implements RocksDBConfigSetter {
|
||||
public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig(){}
|
||||
public RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig() {}
|
||||
|
||||
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
|
||||
options.setTableFormatConfig(new PlainTableConfig());
|
||||
|
|
|
@ -133,6 +133,7 @@ public class TestUtils {
|
|||
|
||||
private WrapperRecorder recorder;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs) {
|
||||
if (configs.containsKey(PROCESSOR_WRAPPER_COUNTER_CONFIG)) {
|
||||
|
|
|
@ -116,7 +116,7 @@ public class ClusterTestExtensionsTest {
|
|||
@ClusterConfigProperty(key = "spam", value = "eggs"),
|
||||
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
|
||||
}, tags = {
|
||||
"default.display.key1", "default.display.key2"
|
||||
"default.display.key1", "default.display.key2"
|
||||
}),
|
||||
@ClusterTest(types = {Type.CO_KRAFT}, serverProperties = {
|
||||
@ClusterConfigProperty(key = "foo", value = "baz"),
|
||||
|
@ -126,7 +126,7 @@ public class ClusterTestExtensionsTest {
|
|||
@ClusterConfigProperty(key = "spam", value = "eggs"),
|
||||
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
|
||||
}, tags = {
|
||||
"default.display.key1", "default.display.key2"
|
||||
"default.display.key1", "default.display.key2"
|
||||
})
|
||||
})
|
||||
public void testClusterTests() throws ExecutionException, InterruptedException {
|
||||
|
|
|
@ -148,7 +148,7 @@ public class ToolsUtils {
|
|||
* @param <T> Element type.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Set<T> minus(Set<T> set, T...toRemove) {
|
||||
public static <T> Set<T> minus(Set<T> set, T... toRemove) {
|
||||
Set<T> res = new HashSet<>(set);
|
||||
for (T t : toRemove)
|
||||
res.remove(t);
|
||||
|
|
|
@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
|
||||
@ExtendWith(ClusterTestExtensions.class)
|
||||
@ClusterTestDefaults(serverProperties = {
|
||||
@ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"),
|
||||
@ClusterConfigProperty(key = ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, value = "true"),
|
||||
})
|
||||
public class BrokerApiVersionsCommandTest {
|
||||
@ClusterTest
|
||||
|
|
|
@ -63,10 +63,10 @@ class MetadataQuorumCommandTest {
|
|||
|
||||
assertTrue(header.matches("NodeId\\s+DirectoryId\\s+LogEndOffset\\s+Lag\\s+LastFetchTimestamp\\s+LastCaughtUpTimestamp\\s+Status\\s+"));
|
||||
|
||||
if (cluster.type() == Type.CO_KRAFT)
|
||||
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size());
|
||||
else
|
||||
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size());
|
||||
if (cluster.type() == Type.CO_KRAFT)
|
||||
assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), data.size());
|
||||
else
|
||||
assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), data.size());
|
||||
|
||||
Pattern leaderPattern = Pattern.compile("\\d+\\s+\\S+\\s+\\d+\\s+\\d+\\s+-?\\d+\\s+-?\\d+\\s+Leader\\s*");
|
||||
assertTrue(leaderPattern.matcher(data.get(0)).find());
|
||||
|
|
|
@ -101,16 +101,16 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = {
|
||||
// shorter backoff to reduce test durations when no active partitions are eligible for fetching due to throttling
|
||||
@ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = "100"),
|
||||
// Don't move partition leaders automatically.
|
||||
@ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
|
||||
@ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "1000"),
|
||||
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
|
||||
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"),
|
||||
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"),
|
||||
@ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"),
|
||||
@ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"),
|
||||
// shorter backoff to reduce test durations when no active partitions are eligible for fetching due to throttling
|
||||
@ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = "100"),
|
||||
// Don't move partition leaders automatically.
|
||||
@ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
|
||||
@ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "1000"),
|
||||
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
|
||||
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"),
|
||||
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"),
|
||||
@ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"),
|
||||
@ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"),
|
||||
})
|
||||
@ExtendWith(ClusterTestExtensions.class)
|
||||
public class ReassignPartitionsCommandTest {
|
||||
|
@ -133,7 +133,7 @@ public class ReassignPartitionsCommandTest {
|
|||
}
|
||||
|
||||
@ClusterTests({
|
||||
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0)
|
||||
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0)
|
||||
})
|
||||
public void testReassignmentWithAlterPartitionDisabled() throws Exception {
|
||||
// Test reassignment when the IBP is on an older version which does not use
|
||||
|
@ -145,11 +145,11 @@ public class ReassignPartitionsCommandTest {
|
|||
}
|
||||
|
||||
@ClusterTests({
|
||||
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
|
||||
@ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
@ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
@ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
})
|
||||
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
|
||||
@ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
@ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
@ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
|
||||
})
|
||||
})
|
||||
public void testReassignmentCompletionDuringPartialUpgrade() throws Exception {
|
||||
// Test reassignment during a partial upgrade when some brokers are relying on
|
||||
|
|
|
@ -41,8 +41,8 @@ public final class Kibosh {
|
|||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"),
|
||||
})
|
||||
@JsonSubTypes.Type(value = KiboshFilesUnreadableFaultSpec.class, name = "unreadable"),
|
||||
})
|
||||
public abstract static class KiboshFaultSpec {
|
||||
@Override
|
||||
public final boolean equals(Object o) {
|
||||
|
|
|
@ -30,11 +30,11 @@ import com.fasterxml.jackson.databind.node.NullNode;
|
|||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "state")
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE),
|
||||
@JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE),
|
||||
@JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE),
|
||||
@JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE)
|
||||
})
|
||||
@JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE),
|
||||
@JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE),
|
||||
@JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE),
|
||||
@JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE)
|
||||
})
|
||||
public abstract class TaskState extends Message {
|
||||
private final TaskSpec spec;
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(value = TimestampRecordProcessor.class, name = "timestamp"),
|
||||
@JsonSubTypes.Type(value = TimestampRecordProcessor.class, name = "timestamp"),
|
||||
})
|
||||
public interface RecordProcessor {
|
||||
void processRecords(ConsumerRecords<byte[], byte[]> consumerRecords);
|
||||
|
|
Loading…
Reference in New Issue